;; This file is part of scheme-GNUnet.
;; Copyright (C) 2021 GNUnet e.V.
;;
;; scheme-GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; scheme-GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL-3.0-or-later

;; C source: src/util/client.c (not completely ported).
;; The Scheme implementation is rather different from the C implementation
;; though.
;;
;; This module allows communication between GNUnet services using stream
;; sockets.

(define-library (gnu gnunet mq-impl stream)
  (export write-envelope! handle-input! handle-output!
	  port->message-queue connect/fibers)
  (import (only (gnu gnunet mq)
		make-one-by-one-sender inject-message! inject-error!
		make-message-queue)
	  (only (gnu gnunet concurrency repeated-condition)
		make-repeated-condition await-trigger! prepare-await-trigger!
		trigger-condition!)
	  (only (gnu gnunet utils bv-slice)
		slice-bv slice-offset slice-length
		slice-readable? bv-slice/read-write
		slice/read-only)
	  (only (gnu gnunet mq envelope)
		attempt-irrevocable-sent!)
	  (only (gnu gnunet utils tokeniser)
		make-tokeniser add-from-port!)
	  (only (gnu gnunet config db)
		read-value)
	  (only (gnu gnunet utils hat-let)
		let^)
	  (only (gnu gnunet util time)
		standard-back-off time-unit:second)
	  (only (rnrs base)
		begin define let values quote apply
		assert = or and eq?
		list if lambda car /)
	  (only (rnrs arithmetic bitwise)
		bitwise-ior)
	  (only (rnrs exceptions)
		guard)
	  (only (rnrs control)
		when)
	  (only (fibers)
		spawn-fiber)
	  (only (fibers timers)
		sleep-operation)
	  (only (fibers io-wakeup)
		wait-until-port-readable-operation
		wait-until-port-writable-operation)
	  (only (fibers conditions)
		make-condition signal-condition! wait-operation)
	  (only (fibers operations)
		wrap-operation perform-operation choice-operation)
	  (srfi srfi-26)
	  (only (srfi srfi-39)
		parameterize)
	  (only (ice-9 suspendable-ports)
		current-read-waiter current-write-waiter)
	  (only (guile)
		error define* identity define-values
		EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
		EPROTOTYPE EPIPE ECONNRESET
		PF_UNIX SOCK_STREAM F_GETFD F_SETFD F_GETFL F_SETFL FD_CLOEXEC
		O_NONBLOCK AF_UNIX
		socket connect fcntl force-output close-port
		make-socket-address
		exception-args exception-kind)
	  (only (rnrs io ports)
		put-bytevector)
	  (only (ice-9 atomic)
		make-atomic-box atomic-box-set! atomic-box-ref)
	  (only (ice-9 control)
		let/ec)
	  (only (srfi srfi-1)
		memv list-ref)
	  (srfi srfi-26))
  (begin
    (define (write-envelope! output envelope)
      "Write the envelope @var{envelope} to the output port @var{output},
unless it is cancelled.  @var{envelope} may not be already sent.  This
can block and raise I/O errors, depending on the port @var{output} and
(in Guile) the current write waiter.  As such, the caller might need to
parameterise the current write waiter and install exception handlers."
      (attempt-irrevocable-sent!
       envelope
       ((go message priority)
	(assert (slice-readable? message))
	;; TODO: how does the port API react to OUTPUT being closed
	;; by the remote peer?
	(put-bytevector output (slice-bv message)
			(slice-offset message) (slice-length message))
	(values))
       ((cancelled) (values))
       ((already-sent) (error "tried to send an envelope twice"))))

    ;; TODO: maybe note that this procedure blocks?
    (define (handle-input! mq input)
      "Keep reading message from the input port @var{input}.

Feed each read message in-order to @var{mq} with @code{inject-message!}.
This procedure might inject errors by its own as usual (e.g. when
no appropriate message handler exists).  This does not include
@code{input:overly-small}, @code{input:premature-end-of-file} or
@code{input:regular-end-of-file}.

If a message with an overly small message size it its header
is encountered, return the error @code{input:overly-small type size},
where @var{type} is the message type as an integer (or @code{#f} if it
could not be determined) and @var{size} is the message size in the header.

When the end-of-file has been reached, return (not inject) the error
@code{input:regular-end-of-file} into @var{mq}.  If the end-of-file
happened while inside a (partial) message, return
@code{input:premature-end-of-file} instead.

@code{ECONNRESET} is treated as @code{input:regular-end-of-file}.
This might or might not be correct.  In case of an I/O error, TODO.

TODO closing message queues."
      (let^ ((! tok (make-tokeniser))
	     (! (handle/message bv offset length)
		(inject-message!
		 mq
		 (slice/read-only
		  (bv-slice/read-write bv offset length))))
	     (! (return/overly-small type size)
		(values 'input:overly-small type size))
	     (! (return/premature-eof)
		(values 'input:premature-end-of-file))
	     (! (return/done-eof)
		(values 'input:regular-end-of-file)))
	    ;; Prevent ‘In procedure fport_read: Connection reset by peer’
	    ;; in tests/network-size.scm.  XXX can this also happen in
	    ;; 'handle-output!'?
	    (guard (c ((and (eq? 'system-error (exception-kind c))
			    (= ECONNRESET (car (list-ref (exception-args c) 3))))
		       (return/done-eof)))
	      (add-from-port! tok input handle/message return/overly-small
			      return/done-eof return/premature-eof))))

    (define (handle-output! mq output wait!)
      "Keep sending message envelopes over the output port @var{output}.

The messages to send are taken in-order from the message queue @var{mq}.
In case of an I/O error, ???.  When the message queue is (temporarily)
empty, the thunk @var{wait!} is called.  It should return when messages
have been added to the queue.

When using guile-fibers, @var{wait!} can be implemented with
@code{await-trigger!} and by calling @code{trigger-condition!}
from the ‘message sender’ of @var{mq}.

When the port @var{output} has been closed for writing, this procedure
returns.  This is detected with the @code{EPIPE} error, so don't block
@code{SIGPIPE} signals.

TODO: detect it has been closed even when not actually writing,
with EPOLLERR -- needs fibers support."
      (define (one-by-one-proc ev)
	(write-envelope! output ev))
      (define send-round
	(cute (make-one-by-one-sender one-by-one-proc)
	      mq))
      (guard (c ((and (eq? 'system-error (exception-kind c))
		      (= EPIPE (car (list-ref (exception-args c) 3))))
		 (values)))
	(let loop ()
	  ;; Doing 'wait!' or 'send-round' the other way around
	  ;; should be acceptable as well.
	  (send-round)
	  ;; If 'output' is buffered, make sure bytes don't just sit
	  ;; in the buffer forever.  Don't flush after each individual
	  ;; envelope for performance.  TODO: should connect-unix enable
	  ;; buffering?
	  (force-output output)
	  (wait!)
	  (loop))))

    ;; See, e.g., the Linux man page path_resolution(7).
    (define %path-resolution-errors
      (list EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG))

    (define (connect-unix config service-name sleep)
      "Try connecting to the server using UNIX domain sockets.

On success, the socket is returned.  If the server has bound
the socket but is not yet listening, wait a little and retry.
If the socket file does not yet exist, wait until it does exist
and retry.  It is assumed the file name of the socket is set
in the configuration @var{config}.  If it is not set there,
an appropriate @code{&undefined-key-error} is raised."
      ;; TODO: use a mechanism like 'inotify' instead of sleeping
      ;; when the socket file does not exist.
      (let^ ((! unix-path
		(read-value identity config service-name "UNIXPATH"))
	     (! address (make-socket-address PF_UNIX unix-path))
	     ;; TODO: maybe catch ENOMEM, ENOBUFS, EMFILE ...
	     (! socket (socket PF_UNIX SOCK_STREAM 0))
	     (_ (fcntl socket F_SETFD
		       (bitwise-ior (fcntl socket F_GETFD) FD_CLOEXEC)))
	     (_ (fcntl socket F_SETFL
		       (bitwise-ior (fcntl socket F_GETFL) O_NONBLOCK)))
	     ;; Grrr why can't we just use 'select' on socket to wait
	     ;; for the connection to complete, like with Internet sockets?
	     (/o/ retry (timeout (standard-back-off 0)))
	     (! (retry)
		(sleep (/ timeout time-unit:second))
		(retry (standard-back-off timeout)))
	     ;; 'system-error-errno' returns #f if 'condition'
	     ;; is not a 'system-error'.
	     (! (eagain? errno)
		(= EAGAIN errno))
	     (! (connection-refused? errno)
		(= ECONNREFUSED errno))
	     (! (path-resolution-error? errno)
		(memv errno %path-resolution-errors))
	     (! (wrong-type-socket-error? errno)
		(= EPROTOTYPE errno))
	     (! (retry-errno? errno)
		;; On Linux, EAGAIN can happen if the receive queue
		;; of the listening socket is full.  I.e., the listening
		;; is being connected to more frequently than the corresponding
		;; proces can keep up with.  This situation should resolve
		;; itself automatically.
		(or (eagain? errno)
		    ;; This can happen if the listening socket is
		    ;; not actually listening yet.  Give the
		    ;; corresponding process a little more time.
		    (connection-refused? errno)
		    ;; See "connect-unix, will connect even if previous socket
		    ;; is different type" test case.
		    (wrong-type-socket-error? errno)
		    ;; Give the process implementing the process some
		    ;; time to set up directory structures, set the
		    ;; permissions appropriately ...
		    (path-resolution-error? errno)))
	     (! ok? (guard (c ((and (eq? (exception-kind c) 'system-error)
				    (retry-errno?
				     (car (list-ref (exception-args c) 3))))
			       #f))
		      (connect socket AF_UNIX unix-path))))
	    ;; Guile returns #f if SOCKET is non-blocking
	    ;; and the connection cannot be made immediately.
	    (if ok?
		socket
		(retry))))

    ;; See 'port->message-queue'.  Also used by connect/fibers.
    (define* (prepare-port-message-queue spawn)
      (define rcvar (make-repeated-condition))
      (define (interrupt! mq)
	(trigger-condition! rcvar))
      ;; 'closed-condition' is used to coordinate the termination of the
      ;; two fibers.  When one fiber detects an EOF condition (or half-duplex),
      ;; it informs the other fiber by signalling the condition and injects
      ;; an appropriate error, unless the other fiber will do it already.
      ;;
      ;; It is also used to determine which of the two fibers should close
      ;; the port.  The port is closed by the fiber for which signal-condition!
      ;; on closed-condition returns #f, as in that case, the other fiber has
      ;; already done all its I/O and won't need the port anymore.
      (define closed-condition (make-condition))
      ;; 'request-close-condition' is used by 'close!' to stop the read fiber,
      ;; such that it will signal 'closed-condition', then the 'write-fiber'
      ;; will also stop and close the port.  Closing it from the write fiber
      ;; would also be possible, but in this implementation, closing happens
      ;; in the read fiber.
      (define request-close-condition (make-condition))
      (define (start-reader! mq port)
	(define-values (key . rest)
	  (let/ec escape
	    (define wait-op
	      (choice-operation
	       (wait-until-port-readable-operation port)
	       (wrap-operation (wait-operation request-close-condition)
			       (lambda ()
				 (escape 'input:regular-end-of-file)))
	       (wrap-operation (wait-operation closed-condition)
			       (lambda ()
				 (escape 'input:regular-end-fof-file)))))
	    (define (new-waiter . _)
	      (perform-operation wait-op))
	    ;; XXX: if (define-values error ...) is written and
	    ;; 'handle-input!' raises an error (resulting in a backtrace),
	    ;; a segfault can
	    ;; happen: <https://debbugs.gnu.org/cgi/bugreport.cgi?bug=50153>.
	    (parameterize ((current-read-waiter new-waiter))
	      (handle-input! mq port))))
	(if (signal-condition! closed-condition)
	    (apply inject-error! mq key rest)
	    ;; TODO: close-port can block!
	    (close-port port)))
      (define (start-writer! mq port)
	(let/ec escape
	  ;; operation for calling the escape continuation when
	  ;; when the other fiber detected the connection is broken
	  (define escape-when-closed-operation
	    (wrap-operation (wait-operation closed-condition)
			    escape))
	  ;; operation for waiting until the port is writable
	  ;; or the other fiber detected the connection is broken.
	  (define wait-writable-operation
	    (choice-operation
	     escape-when-closed-operation
	     (wait-until-port-writable-operation port)))
	  (define (wait!)
	    (perform-operation
	     (choice-operation
	      (prepare-await-trigger! rcvar)
              ;; Don't wait for the port to be writable here!
              ;;
              ;; Otherwise, if the port is writable, but the message
              ;; queue has nothing buffered for a while, the fiber
              ;; keeps spinning.
              ;;
              ;; XXX it would be nice if it could be detected when the
              ;; write end of the port is closed.
	      escape-when-closed-operation)))
	  (define (wait!/blocking)
	    (perform-operation wait-writable-operation))
	  (define old-waiter (current-write-waiter))
	  (define (new-waiter p)
	    (if (eq? p port)
		(wait!/blocking)
		;; Maybe a backtrace is being printed,
		;; 'system-async-mark' is used ...
		(old-waiter p)))
	  (parameterize ((current-write-waiter new-waiter))
	    (handle-output! mq port wait!)))
	(if (signal-condition! closed-condition)
	    (inject-error! mq 'input:regular-end-of-file)
	    ;; TODO: close-port can block!
	    (close-port port)))
      (define (close!)
	(signal-condition! request-close-condition))
      (values (lambda (mq port)
		(spawn (lambda () (start-reader! mq port)))
		(spawn (lambda () (start-writer! mq port))))
	      ;; Pass this to make-message-queue as the 'sender'.
	      interrupt!
	      close!))

    (define* (port->message-queue port handlers error-handler
				  #:key (spawn spawn-fiber))
      "Create a message queue sending and receiving data over @var{port}.

This creates some fibers with @var{spawn} (@code{spawn-fiber} by default).
As such, @var{port} must be non-blocking if @code{spawn-fiber} is used.
All fibers will complete when the end-of-file has been encountered.

When the connection is broken, the error @code{input:regular-end-of-file}
is injected.  A half-duplex port is treated as a broken connection.
XXX: half-duplex connections cannot always be detected
XXX: Likewise for connect/fibers?

The port will be closed when the connection is broken or after
@ode{close-queue!} is called."
      (define-values (start-fibers interrupt! close!)
	(prepare-port-message-queue spawn))
      (define mq
	(make-message-queue handlers error-handler interrupt! close!))
      (start-fibers mq port)
      mq)

    (define* (connect/fibers config service-name handlers error-handler
			     #:key (spawn spawn-fiber))
      "Create a message queue that will be connected in the background
to a GNUnet service @var{service-name}.  The message queue can be used
before the message queue is connected; any send messages will be buffered
until they can be sent.  Some fibers may be created with @var{spawn}
(@code{spawn-fiber} by default).

When the connection has been established, the error @code{connection:connected}
(a symbol) is injected into the message queue.  When the connection has been
closed by the server (e.g. because the server was stopped or is restarting)
the error @code{input:regular-end-of-file} is injected into the message queue.

If the message queue is closed while establishing the connection, the error
@code{connection:interrupted} (a symbol) is injected and
@code{connection:connected} is not injected."
      (define-values (start-fibers interrupt! close!)
	(prepare-port-message-queue spawn))
      (define interrupt-condition (make-condition))
      (define (close*!)
	(signal-condition! interrupt-condition)
	(close!))
      (define mq
	(make-message-queue handlers error-handler interrupt! close*!))
      (spawn (lambda ()
	       ;; Use 'sleep*' to allow 'close*!' to stop the connection
	       ;; forming.
	       (define socket (let/ec escape
				(define allow-interrupt-operation
				  (wrap-operation
				   (wait-operation interrupt-condition)
				   (lambda () (escape #f))))
				(define (sleep* time)
				  (perform-operation
				   (choice-operation
				    (sleep-operation time)
				    allow-interrupt-operation)))
				(connect-unix config service-name sleep*)))
	       (if socket
		   (begin (inject-error! mq 'connection:connected)
			  (start-fibers mq socket))
		   (inject-error! mq 'connection:interrupted))))
      mq)))

